// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
//   suzhi.yt <>

#define USING_LOG_PREFIX SQL_ENG

#include "sql/engine/cmd/ob_load_data_direct_impl.h"
#include "observer/omt/ob_tenant.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_task.h"
#include "observer/table_load/ob_table_load_task_scheduler.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/ob_device_manager.h"
#include "share/backup/ob_backup_io_adapter.h"

namespace oceanbase
{
namespace sql
{
using namespace common;
using namespace observer;
using namespace share;
using namespace table;
using namespace omt;

/**
 * DataAccessParam
 */

ObLoadDataDirectImpl::DataAccessParam::DataAccessParam()
  : file_column_num_(0), file_cs_type_(CS_TYPE_INVALID)
{
}

bool ObLoadDataDirectImpl::DataAccessParam::is_valid() const
{
  return file_column_num_ > 0 && CS_TYPE_INVALID != file_cs_type_;
}

/**
 * LoadExecuteParam
 */

ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
  : tenant_id_(OB_INVALID_ID),
    database_id_(OB_INVALID_ID),
    table_id_(OB_INVALID_ID),
    sql_mode_(0),
    parallel_(0),
    batch_row_count_(0),
    data_mem_usage_limit_(0),
    need_sort_(false),
    online_opt_stat_gather_(false),
    max_error_rows_(-1),
    ignore_row_num_(-1),
    dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE)
{
}

bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const
{
  return OB_INVALID_ID != tenant_id_ && OB_INVALID_ID != database_id_ &&
         OB_INVALID_ID != table_id_ && !database_name_.empty() && !table_name_.empty() &&
         !combined_name_.empty() && parallel_ > 0 && batch_row_count_ > 0 &&
         data_mem_usage_limit_ > 0 && max_error_rows_ >= 0 && ignore_row_num_ >= 0 &&
         ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && data_access_param_.is_valid() &&
         !store_column_idxs_.empty();
}

/**
 * LoadExecuteContext
 */

ObLoadDataDirectImpl::LoadExecuteContext::LoadExecuteContext()
  : direct_loader_(nullptr),
    job_stat_(nullptr),
    logger_(nullptr)
{
}

bool ObLoadDataDirectImpl::LoadExecuteContext::is_valid() const
{
  return nullptr != exec_ctx_ && nullptr != allocator_ && nullptr != direct_loader_ &&
         nullptr != job_stat_ && nullptr != logger_;
}

/**
 * Logger
 */

const char *ObLoadDataDirectImpl::Logger::log_file_column_names =
  "\nFile\tRow\tErrCode\tErrMsg\t\n";
const char *ObLoadDataDirectImpl::Logger::log_file_row_fmt = "%.*s\t%ld\t%d\t%s\t\n";

ObLoadDataDirectImpl::Logger::Logger()
  : is_oracle_mode_(false), buf_(nullptr), is_create_log_succ_(false), is_inited_(false)
{
}

ObLoadDataDirectImpl::Logger::~Logger()
{
  if (nullptr != buf_) {
    ob_free(buf_);
    buf_ = nullptr;
  }
}

int ObLoadDataDirectImpl::Logger::init(const ObString &load_info)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::Logger init twice", KR(ret), KP(this));
  } else if (OB_UNLIKELY(load_info.empty())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(load_info));
  } else if (OB_ISNULL(
        buf_ = static_cast<char *>(ob_malloc(DEFAULT_BUF_LENGTH, ObModIds::OB_SQL_LOAD_DATA)))) {
    ret = OB_ALLOCATE_MEMORY_FAILED;
    LOG_WARN("fail to allocate memory", KR(ret));
  } else {
    if (OB_SUCCESS != create_log_file(load_info)) {
      is_create_log_succ_ = false;
    } else {
      is_create_log_succ_ = true;
    }
    is_oracle_mode_ = lib::is_oracle_mode();
    is_inited_ = true;
  }
  return ret;
}

int ObLoadDataDirectImpl::Logger::create_log_file(const ObString &load_info)
{
  int ret = OB_SUCCESS;
  ObString file_name;
  if (OB_FAIL(generate_log_file_name(buf_, DEFAULT_BUF_LENGTH, file_name))) {
    LOG_WARN("fail to generate log file name", KR(ret));
  } else if (OB_FAIL(file_appender_.open(file_name, false, true))) {
    LOG_WARN("fail to open file", KR(ret), K(file_name));
  } else if (OB_FAIL(file_appender_.append(load_info.ptr(), load_info.length(), true))) {
    LOG_WARN("fail to append log", KR(ret));
  } else if (OB_FAIL(file_appender_.append(log_file_column_names, strlen(log_file_column_names),
                                            true))) {
    LOG_WARN("fail to append log", KR(ret));
  }
  return ret;
}

int ObLoadDataDirectImpl::Logger::generate_log_file_name(char *buf, int64_t size,
                                                         ObString &file_name)
{
  int ret = OB_SUCCESS;
  const char *dict = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
  const int dict_len = strlen(dict); // length of dict
  const char *file_prefix = "log/obloaddata.log.";
  const int64_t prefix_len = strlen(file_prefix);
  const int64_t log_file_id_len = 6;
  if (OB_UNLIKELY(prefix_len + log_file_id_len > size)) {
    ret = OB_SIZE_OVERFLOW;
    LOG_WARN("log file name buf overflow", KR(ret), K(size));
  } else {
    const int64_t cur_ts = ObTimeUtil::current_time();
    uint32_t hash_ts = ::murmurhash2(&cur_ts, sizeof(cur_ts), 0);
    // copy prefix
    MEMCPY(buf, file_prefix, prefix_len);
    // generate file id
    char *id_buf = buf + prefix_len;
    for (int64_t i = 0; i < log_file_id_len; ++i) {
      id_buf[i] = dict[hash_ts % dict_len];
      hash_ts /= dict_len;
    }
    // assign string
    file_name.assign(buf, prefix_len + log_file_id_len);
  }
  return ret;
}

int ObLoadDataDirectImpl::Logger::log_error_line(const ObString &file_name, int64_t line_no,
                                                 int err_code)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::Logger not init", KR(ret), KP(this));
  } else if (is_create_log_succ_) {
    const char *err_msg = ob_errpkt_strerror(err_code, is_oracle_mode_);
    const int err_no = ob_errpkt_errno(err_code, is_oracle_mode_);
    int64_t pos = 0;
    lib::ObMutexGuard guard(mutex_);
    if (OB_FAIL(databuff_printf(buf_, DEFAULT_BUF_LENGTH, pos, log_file_row_fmt, file_name.length(),
                                file_name.ptr(), line_no, err_no, err_msg))) {
      LOG_WARN("fail to databuff printf", KR(ret), K(line_no), K(err_no), K(err_msg));
    } else if (OB_FAIL(file_appender_.append(buf_, pos, false))) {
      LOG_WARN("fail to append log", KR(ret), K(pos), K(line_no), K(err_no), K(err_msg));
    }
  }
  return ret;
}

/**
 * RandomFileReader
 */

ObLoadDataDirectImpl::RandomFileReader::RandomFileReader() : is_inited_(false)
{
}

ObLoadDataDirectImpl::RandomFileReader::~RandomFileReader()
{
}

int ObLoadDataDirectImpl::RandomFileReader::open(const DataAccessParam &data_access_param, const ObString &filename)
{
  int ret = OB_SUCCESS;
  UNUSED(data_access_param);
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("RandomFileReader init twice", KR(ret), KP(this));
  } else if (OB_FAIL(file_reader_.open(filename.ptr(), false))) {
    LOG_WARN("fail to open file", KR(ret), K(filename));
  } else {
    filename_ = filename;
    is_inited_ = true;
  }
  return ret;
}

int ObLoadDataDirectImpl::RandomFileReader::pread(char *buf, int64_t count, int64_t offset, int64_t &read_size)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("RandomFileReader not init", KR(ret), KP(this));
  } else if (OB_FAIL(file_reader_.pread(buf, count, offset, read_size))) {
    LOG_WARN("fail to pread file buf", KR(ret), K(count), K(offset), K(read_size));
  }
  return ret;
}

int ObLoadDataDirectImpl::RandomFileReader::get_file_size(int64_t &file_size)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("RandomFileReader not init", KR(ret), KP(this));
  } else {
    file_size = ::get_file_size(filename_.ptr());
  }
  return ret;
}

/**
 * RandomOSSReader
 */

ObLoadDataDirectImpl::RandomOSSReader::RandomOSSReader() : device_handle_(nullptr), is_inited_(false)
{
}

ObLoadDataDirectImpl::RandomOSSReader::~RandomOSSReader()
{
  if (fd_.is_valid()) {
    device_handle_->close(fd_);
    fd_.reset();
  }
  if (nullptr != device_handle_) {
    common::ObDeviceManager::get_instance().release_device(device_handle_);
    device_handle_ = nullptr;
  }
}

int ObLoadDataDirectImpl::RandomOSSReader::open(const DataAccessParam &data_access_param,
                                                const ObString &filename)
{
  int ret = OB_SUCCESS;
  ObIODOpt opt;
  ObIODOpts iod_opts;
  ObBackupIoAdapter util;
  iod_opts.opts_ = &opt;
  iod_opts.opt_cnt_ = 0;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("RandomOSSReader init twice", KR(ret), KP(this));
  } else if (OB_FAIL(
        util.get_and_init_device(device_handle_, &data_access_param.access_info_, filename))) {
    LOG_WARN("fail to get device manager", KR(ret), K(filename));
  } else if (OB_FAIL(util.set_access_type(&iod_opts, false, 1))) {
    LOG_WARN("fail to set access type", KR(ret));
  } else if (OB_FAIL(device_handle_->open(to_cstring(filename), -1, 0, fd_, &iod_opts))) {
    LOG_WARN("fail to open oss file", KR(ret), K(filename));
  } else {
    is_inited_ = true;
  }
  return ret;
}

int ObLoadDataDirectImpl::RandomOSSReader::pread(char *buf, int64_t count, int64_t offset,
                                                 int64_t &read_size)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("RandomOSSReader not init", KR(ret), KP(this));
  } else if (OB_FAIL(device_handle_->pread(fd_, offset, count, buf, read_size))) {
    LOG_WARN("fail to pread oss buf", KR(ret), K(offset), K(count), K(read_size));
  }
  return ret;
}

int ObLoadDataDirectImpl::RandomOSSReader::get_file_size(int64_t &file_size)
{
  int ret = OB_SUCCESS;
  ObBackupIoAdapter util;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("RandomOSSReader not init", KR(ret), KP(this));
  } else if (OB_FAIL(util.get_file_size(device_handle_, fd_, file_size))) {
    LOG_WARN("fail to get oss file size", KR(ret), K(file_size));
  }
  return ret;
}

/**
 * SequentialDataAccessor
 */

ObLoadDataDirectImpl::SequentialDataAccessor::SequentialDataAccessor()
  : random_io_device_(nullptr), offset_(0), is_inited_(false)
{
}

ObLoadDataDirectImpl::SequentialDataAccessor::~SequentialDataAccessor()
{
}

int ObLoadDataDirectImpl::SequentialDataAccessor::init(const DataAccessParam &data_access_param,
                                                       const ObString &filename)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor init twice", KR(ret), KP(this));
  } else if (OB_UNLIKELY(!data_access_param.is_valid() || filename.empty())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(data_access_param), K(filename));
  } else {
    if (data_access_param.file_location_ == ObLoadFileLocation::SERVER_DISK) {
      if (OB_FAIL(random_file_reader_.open(data_access_param, filename))) {
        LOG_WARN("fail to open random file reader", KR(ret), K(filename));
      } else {
        random_io_device_ = &random_file_reader_;
      }
    } else if (data_access_param.file_location_ == ObLoadFileLocation::OSS) {
      if (OB_FAIL(random_oss_reader_.open(data_access_param, filename))) {
        LOG_WARN("fail to open random oss reader", KR(ret), K(filename));
      } else {
        random_io_device_ = &random_oss_reader_;
      }
    } else {
      ret = OB_NOT_SUPPORTED;
      LOG_WARN("not supported load file location", KR(ret), K(data_access_param.file_location_));
    }
    if (OB_SUCC(ret)) {
      is_inited_ = true;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::SequentialDataAccessor::read(char *buf, int64_t count, int64_t &read_size)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor not init", KR(ret), KP(this));
  } else if (OB_UNLIKELY(nullptr == buf || count <= 0)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), KP(buf), K(count));
  } else {
    if (OB_FAIL(random_io_device_->pread(buf, count, offset_, read_size))) {
      LOG_WARN("fail to do pread", KR(ret), K(offset_));
    } else {
      offset_ += read_size;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::SequentialDataAccessor::get_file_size(int64_t &file_size)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor not init", KR(ret), KP(this));
  } else if (OB_FAIL(random_io_device_->get_file_size(file_size))) {
    LOG_WARN("fail to get random io device file size", KR(ret), K(file_size));
  }
  return ret;
}

/**
 * DataDescIterator
 */

ObLoadDataDirectImpl::DataDescIterator::DataDescIterator()
  : pos_(0)
{
}

ObLoadDataDirectImpl::DataDescIterator::~DataDescIterator()
{
}

int ObLoadDataDirectImpl::DataDescIterator::copy(const ObLoadFileIterator &file_iter)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(!file_iter.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(file_iter));
  } else {
    ObLoadFileIterator copy_file_iter;
    DataDesc data_desc;
    int64_t file_idx = 0;
    if (OB_FAIL(copy_file_iter.copy(file_iter))) {
      LOG_WARN("fail to copy file iter", KR(ret));
    }
    while (OB_SUCC(ret)) {
      data_desc.file_idx_ = file_idx++;
      if (OB_FAIL(copy_file_iter.get_next_file(data_desc.filename_))) {
        if (OB_UNLIKELY(OB_ITER_END != ret)) {
          LOG_WARN("fail to get next file", KR(ret));
        } else {
          ret = OB_SUCCESS;
          break;
        }
      } else if (OB_FAIL(data_descs_.push_back(data_desc))) {
        LOG_WARN("fail to push back", KR(ret));
      }
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::DataDescIterator::copy(const DataDescIterator &other)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(0 == other.count())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(other));
  } else if (OB_FAIL(data_descs_.assign(other.data_descs_))) {
    LOG_WARN("fail to assign data descs", KR(ret));
  } else {
    pos_ = 0;
  }
  return ret;
}

int ObLoadDataDirectImpl::DataDescIterator::add_data_desc(const DataDesc &data_desc)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(data_descs_.push_back(data_desc))) {
    LOG_WARN("fail to push back", KR(ret));
  }
  return ret;
}

int ObLoadDataDirectImpl::DataDescIterator::get_next_data_desc(DataDesc &data_desc)
{
  int ret = OB_SUCCESS;
  if (pos_ >= data_descs_.count()) {
    ret = OB_ITER_END;
  } else {
    data_desc = data_descs_.at(pos_++);
  }
  return ret;
}

/**
 * DataBuffer
 */

ObLoadDataDirectImpl::DataBuffer::DataBuffer()
  : file_buffer_(nullptr), pos_(0), is_end_file_(false)
{
}

ObLoadDataDirectImpl::DataBuffer::~DataBuffer()
{
  reset();
}

void ObLoadDataDirectImpl::DataBuffer::reuse()
{
  if (nullptr != file_buffer_) {
    file_buffer_->reset();
  }
  pos_ = 0;
  is_end_file_ = false;
}

void ObLoadDataDirectImpl::DataBuffer::reset()
{
  if (nullptr != file_buffer_) {
    file_buffer_->~ObLoadFileBuffer();
    ob_free(file_buffer_);
    file_buffer_ = nullptr;
  }
  pos_ = 0;
}

int ObLoadDataDirectImpl::DataBuffer::init(int64_t capacity)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(nullptr != file_buffer_)) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::DataBuffer init twice", KR(ret), KPC(file_buffer_));
  } else if (OB_UNLIKELY(capacity <= 0)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid buffer capacity", KR(ret), K(capacity));
  } else {
    const int64_t alloc_size =
      MIN(capacity + sizeof(ObLoadFileBuffer), ObLoadFileBuffer::MAX_BUFFER_SIZE);
    ObMemAttr attr(MTL_ID(), "MTL_DataBuffer");
    void *buf = nullptr;
    if (OB_ISNULL(buf = ob_malloc(alloc_size, attr))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("fail to allocate memory", KR(ret), K(alloc_size));
    } else {
      file_buffer_ = new (buf) ObLoadFileBuffer(alloc_size - sizeof(ObLoadFileBuffer));
    }
  }
  return ret;
}

bool ObLoadDataDirectImpl::DataBuffer::is_valid() const
{
  return nullptr != file_buffer_ && pos_ >= 0 && pos_ <= file_buffer_->get_data_len();
}

int64_t ObLoadDataDirectImpl::DataBuffer::get_data_length() const
{
  int64_t len = 0;
  if (is_valid()) {
    len = file_buffer_->get_data_len() - pos_;
  }
  return len;
}

int64_t ObLoadDataDirectImpl::DataBuffer::get_remain_length() const
{
  int64_t len = 0;
  if (is_valid()) {
    len = file_buffer_->get_remain_len();
  }
  return len;
}

bool ObLoadDataDirectImpl::DataBuffer::empty() const { return 0 == get_data_length(); }

char *ObLoadDataDirectImpl::DataBuffer::data() const
{
  char *buf = nullptr;
  if (is_valid()) {
    buf = file_buffer_->begin_ptr() + pos_;
  }
  return buf;
}

void ObLoadDataDirectImpl::DataBuffer::advance(int64_t length)
{
  OB_ASSERT(get_data_length() >= length);
  pos_ += length;
}

void ObLoadDataDirectImpl::DataBuffer::update_data_length(int64_t length)
{
  OB_ASSERT(get_remain_length() >= length);
  file_buffer_->update_pos(length);
}

int ObLoadDataDirectImpl::DataBuffer::squash()
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(!is_valid())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected invalid buffer", KR(ret));
  } else {
    const int64_t length = file_buffer_->get_data_len() - pos_;
    if (length > 0) {
      MEMMOVE(file_buffer_->begin_ptr(), file_buffer_->begin_ptr() + pos_, length);
    }
    reuse();
    file_buffer_->update_pos(length);
  }
  return ret;
}

void ObLoadDataDirectImpl::DataBuffer::swap(DataBuffer &other)
{
  std::swap(file_buffer_, other.file_buffer_);
  std::swap(pos_, other.pos_);
}

/**
 * DataReader
 */

ObLoadDataDirectImpl::DataReader::DataReader()
  : execute_ctx_(nullptr), end_offset_(0), read_raw_(false), is_iter_end_(false), is_inited_(false)
{
}

int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_param,
                                           LoadExecuteContext &execute_ctx,
                                           const DataDesc &data_desc, bool read_raw)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::DataReader init twice", KR(ret), KP(this));
  } else {
    execute_ctx_ = &execute_ctx;
    read_raw_ = read_raw;
    if (OB_FAIL(csv_parser_.init(data_access_param.file_format_, data_access_param.file_column_num_,
                                 data_access_param.file_cs_type_))) {
      LOG_WARN("fail to init csv parser", KR(ret), K(data_access_param));
    }
    if (OB_SUCC(ret) && !read_raw) {
      ObCSVFormats formats;
      formats.init(data_access_param.file_format_);
      if (OB_FAIL(data_trimer_.init(*execute_ctx_->allocator_, formats))) {
        LOG_WARN("fail to init data trimer", KR(ret));
      }
    }
    if (OB_SUCC(ret)) {
      end_offset_ = data_desc.end_;
      if (OB_FAIL(io_accessor_.init(data_access_param, data_desc.filename_))) {
        LOG_WARN("fail to init io device", KR(ret), K(data_desc));
      } else if (end_offset_ == -1 && OB_FAIL(io_accessor_.get_file_size(end_offset_))) {
        LOG_WARN("fail to get file size", KR(ret), K(data_desc));
      } else {
        io_accessor_.seek(data_desc.start_);
        ATOMIC_AAF(&execute_ctx_->job_stat_->total_bytes_, (end_offset_ - data_desc.start_));
      }
    }
    if (OB_SUCC(ret)) {
      is_inited_ = true;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::DataReader::get_next_buffer(ObLoadFileBuffer &file_buffer,
                                                      int64_t &line_count, int64_t limit)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::DataReader not init", KR(ret), KP(this));
  } else if (OB_UNLIKELY(limit <= 0)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(limit));
  } else if (OB_UNLIKELY(read_raw_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected read complete line buffer", KR(ret));
  } else if (is_iter_end_) {
    ret = OB_ITER_END;
  } else {
    file_buffer.reset();
    line_count = 0;

    // 1. 从data_trimer中恢复出上次读取留下的数据
    if (OB_FAIL(data_trimer_.recover_incomplate_data(file_buffer))) {
      LOG_WARN("fail to recover incomplate data", KR(ret));
    }
    // 2. 从文件里读取后续的数据
    else if (!is_end_file()) {
      int64_t read_count = 0;
      int64_t read_size = 0;
      if (FALSE_IT(read_count =
                     MIN(file_buffer.get_remain_len(), end_offset_ - io_accessor_.get_offset()))) {
      } else if (OB_FAIL(io_accessor_.read(file_buffer.current_ptr(), read_count, read_size))) {
        LOG_WARN("fail to read file", KR(ret));
      } else if (OB_UNLIKELY(read_count != read_size)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("unexpected read size", KR(ret), K(read_count), K(read_size), K(end_offset_));
      } else {
        file_buffer.update_pos(read_size); // 更新buffer中数据长度
        ATOMIC_AAF(&execute_ctx_->job_stat_->read_bytes_, read_size);
      }
    }
    // 3. 从buffer中找出完整的行，剩下的数据缓存到data_trimer
    if (OB_SUCC(ret)) {
      if (!file_buffer.is_valid()) {
        is_iter_end_ = true;
        ret = OB_ITER_END;
      } else {
        int64_t complete_cnt = limit;
        int64_t complete_len = 0;
        if (OB_FAIL(ObLoadDataBase::pre_parse_lines(file_buffer, csv_parser_, is_end_file(),
                                                    complete_len, complete_cnt))) {
          LOG_WARN("fail to fast_lines_parse", KR(ret));
        } else if (OB_UNLIKELY(0 == complete_len)) {
          ret = OB_NOT_SUPPORTED;
          LOG_WARN("direct-load does not support big row", KR(ret), "size",
                   file_buffer.get_data_len());
        } else if (OB_FAIL(data_trimer_.backup_incomplate_data(file_buffer, complete_len))) {
          LOG_WARN("fail to back up data", KR(ret));
        } else {
          data_trimer_.commit_line_cnt(complete_cnt);
          line_count = complete_cnt;
          LOG_DEBUG("LOAD DATA backup", "data", data_trimer_.get_incomplate_data_string());
        }
      }
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::DataReader::get_next_raw_buffer(DataBuffer &data_buffer)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::DataReader not init", KR(ret), KP(this));
  } else if (OB_UNLIKELY(!read_raw_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected read raw buffer", KR(ret));
  } else if (is_end_file()) {
    ret = OB_ITER_END;
  } else if (data_buffer.get_remain_length() > 0) {
    const int64_t read_count =
      MIN(data_buffer.get_remain_length(), end_offset_ - io_accessor_.get_offset());
    int64_t read_size = 0;
    if (OB_FAIL(io_accessor_.read(data_buffer.data() + data_buffer.get_data_length(), read_count,
                                  read_size))) {
      LOG_WARN("fail to read file", KR(ret));
    } else if (OB_UNLIKELY(read_count != read_size)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("unexpected read size", KR(ret), K(read_count), K(read_size), K(end_offset_));
    } else {
      data_buffer.update_data_length(read_size);
      ATOMIC_AAF(&execute_ctx_->job_stat_->read_bytes_, read_size);
    }
  }
  return ret;
}

/**
 * DataParser
 */

ObLoadDataDirectImpl::DataParser::DataParser()
  : data_buffer_(nullptr),
    start_line_no_(0),
    pos_(0),
    logger_(nullptr),
    is_inited_(false)
{
}

ObLoadDataDirectImpl::DataParser::~DataParser()
{
}

int ObLoadDataDirectImpl::DataParser::init(const DataAccessParam &data_access_param, Logger &logger)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::DataParser init twice", KR(ret), KP(this));
  } else {
    if (OB_FAIL(csv_parser_.init(data_access_param.file_format_, data_access_param.file_column_num_,
                                 data_access_param.file_cs_type_))) {
      LOG_WARN("fail to init csv parser", KR(ret));
    } else if (OB_FAIL(escape_buffer_.init())) {
      LOG_WARN("fail to init data buffer", KR(ret));
    } else {
      logger_ = &logger;
      is_inited_ = true;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::DataParser::parse(const ObString &file_name, int64_t start_line_no,
                                            DataBuffer &data_buffer)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::DataParser not init", KR(ret), KP(this));
  } else if (OB_UNLIKELY(!data_buffer.is_valid() || data_buffer.empty())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(data_buffer));
  } else {
    file_name_ = file_name;
    start_line_no_ = start_line_no;
    pos_ = 0;
    data_buffer_ = &data_buffer;
  }
  return ret;
}

int ObLoadDataDirectImpl::DataParser::get_next_row(ObNewRow &row)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::DataParser not init", KR(ret), KP(this));
  } else if (OB_ISNULL(data_buffer_) || OB_ISNULL(row.cells_)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), KP(data_buffer_), K(row));
  } else if (data_buffer_->empty()) {
    ret = OB_ITER_END;
  } else {
    const char *str = data_buffer_->data();
    const char *end = str + data_buffer_->get_data_length();
    ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records;
    int64_t nrows = 1;
    auto handle_one_line = [](ObIArray<ObCSVGeneralParser::FieldValue> &fields_per_line) -> int {
      UNUSED(fields_per_line);
      return OB_SUCCESS;
    };
    ret = csv_parser_.scan<decltype(handle_one_line), true>(
      str, end, nrows, escape_buffer_.file_buffer_->begin_ptr(),
      escape_buffer_.file_buffer_->begin_ptr() + escape_buffer_.file_buffer_->get_buffer_size(),
      handle_one_line, err_records, data_buffer_->is_end_file_);
    if (OB_FAIL(ret)) {
      LOG_WARN("fail to scan", KR(ret));
    } else if (OB_UNLIKELY(!err_records.empty())) {
      ret = OB_ERR_WRONG_VALUE;
      LOG_WARN("parse error", KR(ret));
    } else if (0 == nrows) {
      ret = OB_ITER_END;
    } else {
      ++pos_;
      const ObIArray<ObCSVGeneralParser::FieldValue> &field_values_in_file =
        csv_parser_.get_fields_per_line();
      for (int64_t i = 0; OB_SUCC(ret) && i < row.count_; ++i) {
        const ObCSVGeneralParser::FieldValue &str_v = field_values_in_file.at(i);
        ObObj &obj = row.cells_[i];
        if (str_v.is_null_) {
          obj.set_null();
        } else {
          obj.set_string(ObVarcharType, ObString(str_v.len_, str_v.ptr_));
          obj.set_collation_type(
            ObCharset::get_default_collation(csv_parser_.get_format().cs_type_));
        }
      }
    }
    for (int64_t i = 0; i < err_records.count(); ++i) {
      log_error_line(err_records.at(i).err_code, start_line_no_ + pos_ + 1);
    }
    data_buffer_->advance(str - data_buffer_->data());
  }
  return ret;
}

void ObLoadDataDirectImpl::DataParser::log_error_line(int err_ret, int64_t err_line_no)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(logger_->log_error_line(file_name_, err_line_no, err_ret))) {
    LOG_WARN("fail to log error line", KR(ret), K(err_ret), K(err_line_no));
  }
}

/**
 * SimpleDataSplitUtils
 */

bool ObLoadDataDirectImpl::SimpleDataSplitUtils::is_simple_format(
  const ObDataInFileStruct &file_format, ObCollationType file_cs_type)
{
  bool bret = false;
  ObCharsetType char_set = ObCharset::charset_type_by_coll(file_cs_type);
  if (char_set == CHARSET_UTF8MB4 && file_format.line_term_str_.length() == 1 &&
      file_format.line_start_str_.empty() && file_format.field_term_str_.length() == 1 &&
      file_format.field_enclosed_char_ == INT64_MAX &&
      file_format.field_escaped_char_ == INT64_MAX &&
      file_format.line_term_str_.ptr()[0] != file_format.field_term_str_.ptr()[0]) {
    bret = true;
  }
  return bret;
}

int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &data_access_param,
                                                      const DataDesc &data_desc, int64_t count,
                                                      DataDescIterator &data_desc_iter)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(!data_access_param.is_valid() || count <= 0)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(data_access_param), K(count));
  } else if (OB_UNLIKELY(!is_simple_format(data_access_param.file_format_,
                                           data_access_param.file_cs_type_))) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected data format", KR(ret), K(data_access_param));
  } else if (1 == count) {
    if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
      LOG_WARN("fail to push back", KR(ret));
    }
  } else {
    int64_t end_offset = data_desc.end_;
    SequentialDataAccessor io_device;
    if (OB_FAIL(io_device.init(data_access_param, data_desc.filename_))) {
      LOG_WARN("fail to init io device", KR(ret), K(data_desc.filename_));
    } else if (-1 == end_offset && OB_FAIL(io_device.get_file_size(end_offset))) {
      LOG_WARN("fail to get io device file size", KR(ret), K(end_offset));
    } else {
      const int64_t file_size = end_offset - data_desc.start_;
      if (file_size < count * ObLoadFileBuffer::MAX_BUFFER_SIZE * 2) {
        // file is too small
        if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
          LOG_WARN("fail to push back", KR(ret));
        }
      } else {
        const char line_term_char = data_access_param.file_format_.line_term_str_.ptr()[0];
        const int64_t buf_size = (128LL << 10) + 1;
        const int64_t split_size = file_size / count;
        ObArenaAllocator allocator;
        char *buf = nullptr;
        int64_t read_size = 0;
        DataDesc data_desc_ret;
        data_desc_ret.file_idx_ = data_desc.file_idx_;
        data_desc_ret.filename_ = data_desc.filename_;
        data_desc_ret.start_ = data_desc.start_;
        allocator.set_tenant_id(MTL_ID());
        if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_size)))) {
          LOG_WARN("fail to alloc memory", KR(ret));
        }
        for (int64_t i = 0; OB_SUCC(ret) && i < count - 1; ++i) {
          int64_t read_offset = data_desc.start_ + split_size * (i + 1);
          io_device.seek(read_offset);
          char *found = nullptr;
          while (OB_SUCC(ret) && end_offset > io_device.get_offset() && nullptr == found) {
            read_offset = io_device.get_offset();
            const int64_t read_count = MIN(end_offset - read_offset, buf_size - 1);
            if (OB_FAIL(io_device.read(buf, read_count, read_size))) {
              LOG_WARN("fail to do read", KR(ret), K(read_offset), K(read_count));
            } else if (OB_UNLIKELY(read_count != read_size)) {
              ret = OB_ERR_UNEXPECTED;
              LOG_WARN("unexpected read size", KR(ret), K(read_count), K(read_size));
            } else {
              buf[read_size] = '\0';
              found = STRCHR(buf, line_term_char);
            }
          }
          if (OB_SUCC(ret)) {
            if (nullptr == found) {
              ret = OB_ERR_UNEXPECTED;
              LOG_WARN("unexpected large row", KR(ret));
            } else {
              data_desc_ret.end_ = read_offset + (found - buf + 1);
              if (OB_FAIL(data_desc_iter.add_data_desc(data_desc_ret))) {
                LOG_WARN("fail to push back", KR(ret));
              } else {
                data_desc_ret.start_ = data_desc_ret.end_;
              }
            }
          }
        }
        if (OB_SUCC(ret)) {
          data_desc_ret.end_ = data_desc.end_;
          if (OB_FAIL(data_desc_iter.add_data_desc(data_desc_ret))) {
            LOG_WARN("fail to push back", KR(ret));
          }
        }
      }
    }
  }
  return ret;
}

/**
 * FileLoadExecutor
 */

ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor()
  : execute_param_(nullptr),
    execute_ctx_(nullptr),
    task_scheduler_(nullptr),
    worker_ctx_array_(nullptr),
    is_inited_(false)
{
}

ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor()
{
  if (nullptr != task_scheduler_) {
    task_scheduler_->stop();
    task_scheduler_->wait();
    task_scheduler_->~ObITableLoadTaskScheduler();
    execute_ctx_->allocator_->free(task_scheduler_);
    task_scheduler_ = nullptr;
  }
  if (nullptr != worker_ctx_array_) {
    for (int64_t i = 0; i < execute_param_->parallel_; ++i) {
      WorkerContext *worker_ctx = worker_ctx_array_ + i;
      worker_ctx->~WorkerContext();
    }
    execute_ctx_->allocator_->free(worker_ctx_array_);
    worker_ctx_array_ = nullptr;
  }
  for (int64_t i = 0; i < handle_resource_.count(); ++i) {
    TaskHandle *handle = handle_resource_.at(i);
    handle->~TaskHandle();
    execute_ctx_->allocator_->free(handle);
  }
  handle_resource_.reset();
}

int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &execute_param,
                                                       LoadExecuteContext &execute_ctx,
                                                       int64_t handle_count)
{
  int ret = OB_SUCCESS;
  execute_param_ = &execute_param;
  execute_ctx_ = &execute_ctx;
  // init task_allocator_
  if (OB_FAIL(task_allocator_.init("TLD_TaskPool", execute_param_->tenant_id_))) {
    LOG_WARN("fail to init allocator", KR(ret));
  }
  // init task_scheduler_
  else if (OB_ISNULL(task_scheduler_ =
                         OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_),
                                 execute_param_->parallel_, *execute_ctx_->allocator_))) {
    ret = OB_ALLOCATE_MEMORY_FAILED;
    LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
  } else if (OB_FAIL(task_scheduler_->init())) {
    LOG_WARN("fail to init task scheduler", KR(ret));
  } else if (OB_FAIL(task_scheduler_->start())) {
    LOG_WARN("fail to start task scheduler", KR(ret));
  }
  // init worker_ctx_array_
  else if (OB_FAIL(init_worker_ctx_array())) {
    LOG_WARN("fail to init worker ctx array", KR(ret));
  }
  // task ctrl
  else if (OB_FAIL(task_controller_.init(handle_count))) {
    LOG_WARN("fail to init task controller", KR(ret), K(handle_count));
  } else if (OB_FAIL(handle_reserve_queue_.init(handle_count))) {
    LOG_WARN("fail to init handle reserve queue", KR(ret), K(handle_count));
  }
  for (int64_t i = 0; OB_SUCC(ret) && i < handle_count; ++i) {
    TaskHandle *handle = nullptr;
    if (OB_ISNULL(handle = OB_NEWx(TaskHandle, execute_ctx_->allocator_))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("fail to allocate handler", KR(ret));
    } else if (OB_FAIL(handle->data_buffer_.init())) {
      LOG_WARN("fail to init data buffer", KR(ret));
    } else if (OB_FAIL(handle_reserve_queue_.push_back(handle))) {
      LOG_WARN("fail to push back handle to queue", KR(ret));
    } else if (OB_FAIL(handle_resource_.push_back(handle))) {
      LOG_WARN("fail to push back handle to array", KR(ret));
    }
    if (OB_FAIL(ret)) {
      if (nullptr != handle) {
        handle->~TaskHandle();
        execute_ctx_->allocator_->free(handle);
      }
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::FileLoadExecutor::init_worker_ctx_array()
{
  int ret = OB_SUCCESS;
  void *buf = nullptr;
  if (OB_ISNULL(
        buf = execute_ctx_->allocator_->alloc(sizeof(WorkerContext) * execute_param_->parallel_))) {
    ret = OB_ALLOCATE_MEMORY_FAILED;
    LOG_WARN("fail to allocate memory", KR(ret));
  } else {
    worker_ctx_array_ = new (buf) WorkerContext[execute_param_->parallel_];
    for (int64_t i = 0; OB_SUCC(ret) && i < execute_param_->parallel_; ++i) {
      WorkerContext *worker_ctx = worker_ctx_array_ + i;
      if (OB_FAIL(worker_ctx->data_parser_.init(execute_param_->data_access_param_,
                                                *execute_ctx_->logger_))) {
        LOG_WARN("fail to init data parser", KR(ret), K(execute_param_->data_access_param_));
      } else if (OB_FAIL(
                   worker_ctx->objs_.create(execute_param_->data_access_param_.file_column_num_ *
                                              execute_param_->batch_row_count_,
                                            *execute_ctx_->allocator_))) {
        LOG_WARN("fail to create obj array", KR(ret));
      }
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::FileLoadExecutor::execute()
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this));
  } else {
    if (OB_FAIL(prepare_execute())) {
      LOG_WARN("fail to prepare execute", KR(ret));
    }

    while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->check_status())) {
      TaskHandle *handle = nullptr;
      if (OB_FAIL(get_next_task_handle(handle))) {
        if (OB_UNLIKELY(OB_ITER_END != ret)) {
          LOG_WARN("fail to get next task handle", KR(ret));
        } else {
          ret = OB_SUCCESS;
          break;
        }
      } else {
        ObTableLoadTask *task = nullptr;
        if (OB_FAIL(alloc_task(task))) {
          LOG_WARN("fail to alloc task", KR(ret));
        } else if (OB_FAIL(fill_task(handle, task))) {
          LOG_WARN("fail to fill task", KR(ret));
        } else if (OB_FAIL(task_scheduler_->add_task(handle->session_id_ - 1, task))) {
          LOG_WARN("fail to add task", KR(ret), K(handle->session_id_), KPC(task));
        }
        if (OB_FAIL(ret)) {
          if (nullptr != task) {
            free_task(task);
          }
        }
      }
      if (OB_FAIL(ret)) {
        if (nullptr != handle) {
          task_finished(handle);
        }
      }
    }

    wait_all_task_finished();

    if (OB_SUCC(ret)) {
      if (OB_FAIL(handle_all_task_result())) {
        LOG_WARN("fail to handle all task result", KR(ret));
      }
    }

  }
  return ret;
}

int ObLoadDataDirectImpl::FileLoadExecutor::alloc_task(ObTableLoadTask *&task)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this));
  } else {
    if (OB_ISNULL(task = task_allocator_.alloc(execute_param_->tenant_id_))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("fail to alloc task", KR(ret));
    }
  }
  return ret;
}

void ObLoadDataDirectImpl::FileLoadExecutor::free_task(ObTableLoadTask *task)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this));
  } else if (OB_ISNULL(task)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), KP(task));
  } else {
    task_allocator_.free(task);
  }
}

int ObLoadDataDirectImpl::FileLoadExecutor::fetch_task_handle(TaskHandle *&handle)
{
  int ret = OB_SUCCESS;
  handle = nullptr;
  if (OB_FAIL(task_controller_.on_next_task())) {
    LOG_WARN("fail to on next task", KR(ret));
  } else {
    if (OB_FAIL(handle_reserve_queue_.pop(handle))) {
      LOG_WARN("fail to pop handle", KR(ret));
    } else if (OB_ISNULL(handle)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("unexpected null handle", KR(ret));
    } else if (OB_FAIL(handle_task_result(handle->task_id_, handle->result_))) {
      LOG_WARN("fail to handle task result", KR(ret), KPC(handle));
    }
    if (OB_FAIL(ret)) {
      // 主动调用on_task_finished, 防止wait_all_task_finished卡住
      task_controller_.on_task_finished();
    }
  }
  return ret;
}

void ObLoadDataDirectImpl::FileLoadExecutor::task_finished(TaskHandle *handle)
{
  int ret = OB_SUCCESS;
  if (OB_ISNULL(handle)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected null handle", KR(ret));
  } else {
    handle->result_.finished_ts_ = ObTimeUtil::current_time();
    int ret1 = handle_reserve_queue_.push_back(handle);
    MEM_BARRIER();
    int ret2 = task_controller_.on_task_finished();
    if (OB_UNLIKELY(OB_FAIL(ret1) || OB_FAIL(ret2))) {
      LOG_ERROR("fail to finished task", KR(ret1), KR(ret2));
    }
  }
}

int ObLoadDataDirectImpl::FileLoadExecutor::handle_task_result(int64_t task_id, TaskResult &result)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(result.ret_)) {
    LOG_WARN("task result is failed", KR(ret), K(task_id));
  }
  /*
  if (0 != result.created_ts_) {
    int64_t wait_us = 0, proccess_us = 0;
    if (0 != result.start_process_ts_) {
      wait_us = result.start_process_ts_ - result.created_ts_;
      proccess_us = result.finished_ts_ - result.start_process_ts_;
    } else {
      wait_us = result.finished_ts_ - result.created_ts_;
    }
  }
  */
  result.reset();
  return ret;
}

int ObLoadDataDirectImpl::FileLoadExecutor::handle_all_task_result()
{
  int ret = OB_SUCCESS;
  TaskHandle *handle = nullptr;
  while (OB_SUCC(ret) && handle_reserve_queue_.count() > 0) {
    if (OB_FAIL(handle_reserve_queue_.pop(handle))) {
      LOG_WARN("fail to pop handle", KR(ret));
    } else if (OB_ISNULL(handle)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("unexpected null handle", KR(ret));
    } else if (OB_FAIL(handle_task_result(handle->task_id_, handle->result_))) {
      LOG_WARN("fail to handle task result", KR(ret), KPC(handle));
    }
  }
  return ret;
}

void ObLoadDataDirectImpl::FileLoadExecutor::wait_all_task_finished()
{
  const int64_t processing_task_cnt = task_controller_.get_processing_task_cnt();
  const int64_t total_task_cnt = task_controller_.get_total_task_cnt();
  LOG_INFO("LOAD DATA wait all task finish", K(processing_task_cnt), K(total_task_cnt));
  task_controller_.wait_all_task_finish(execute_param_->combined_name_.ptr(), THIS_WORKER.get_timeout_ts());
}

int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_idx,
                                                                TaskHandle *handle,
                                                                int64_t &total_processed_line_count)
{
  int ret = OB_SUCCESS;
  if (IS_NOT_INIT) {
    ret = OB_NOT_INIT;
    LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this));
  } else if (OB_UNLIKELY(worker_idx < 0 || worker_idx >= execute_param_->parallel_ ||
                         nullptr == handle)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), KPC(execute_param_), K(worker_idx), KP(handle));
  } else {
    WorkerContext &worker_ctx = worker_ctx_array_[worker_idx];
    const int64_t column_count = execute_param_->data_access_param_.file_column_num_;
    const int64_t data_buffer_length = handle->data_buffer_.get_data_length();
    int64_t parsed_bytes = 0;
    int64_t processed_line_count = 0;
    total_processed_line_count = 0;
    ObNewRow row;
    bool is_iter_end = false;
    if (OB_FAIL(worker_ctx.data_parser_.parse(handle->data_desc_.filename_, handle->start_line_no_,
                                              handle->data_buffer_))) {
      LOG_WARN("fail to parse data", KR(ret), KPC(handle));
    } else {
      row.cells_ = worker_ctx.objs_.ptr();
      row.count_ = column_count;
    }
    while (OB_SUCC(ret) && !is_iter_end) {
      // 每个新的batch需要分配一个新的shared_allocator
      ObTableLoadSharedAllocatorHandle allocator_handle =
        ObTableLoadSharedAllocatorHandle::make_handle();
      if (!allocator_handle) {
        ret = OB_ALLOCATE_MEMORY_FAILED;
        LOG_WARN("failed to make allocator handle", KR(ret));
      }
      ObTableLoadObjRowArray obj_rows;
      obj_rows.set_allocator(allocator_handle);

      while (OB_SUCC(ret) && (processed_line_count < execute_param_->batch_row_count_)) {
        if (OB_FAIL(worker_ctx.data_parser_.get_next_row(row))) {
          if (OB_UNLIKELY(OB_ITER_END != ret)) {
            LOG_WARN("fail to get next row", KR(ret));
          } else {
            ret = OB_SUCCESS;
            is_iter_end = true;
            break;
          }
        } else {
          //此时row中的每个obj的内容指向的是data parser中的内存
          //因此得把它们深拷贝一遍
          ObTableLoadObjRow tmp_obj_row;
          if (OB_FAIL(tmp_obj_row.deep_copy_and_assign(row.cells_, row.count_, allocator_handle))) {
            LOG_WARN("failed to deep copy add assign to tmp_obj_row", KR(ret));
          } else if (OB_FAIL(obj_rows.push_back(tmp_obj_row))) {
            LOG_WARN("failed to add tmp_obj_row to obj_rows", KR(ret));
          } else {
            ++processed_line_count;
            row.cells_ += column_count;
          }
        }
      } // end while()

      if (OB_SUCC(ret) && (processed_line_count > 0)) {
        if (OB_FAIL(execute_ctx_->direct_loader_->write(handle->session_id_, obj_rows))) {
          LOG_WARN("fail to write objs", KR(ret));
        } else {
          total_processed_line_count += processed_line_count;
          processed_line_count = 0;
          row.cells_ = worker_ctx.objs_.ptr();
        }
      }
    } // end while()
    parsed_bytes = data_buffer_length - handle->data_buffer_.get_data_length();
    handle->result_.proccessed_row_count_ += total_processed_line_count;
    handle->result_.parsed_bytes_ += parsed_bytes;
    ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, total_processed_line_count);
    ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_bytes_, parsed_bytes);
  }
  return ret;
}

/**
 * FileLoadTaskCallback
 */

class ObLoadDataDirectImpl::FileLoadTaskCallback : public ObITableLoadTaskCallback
{
public:
  FileLoadTaskCallback(FileLoadExecutor *load_executor, TaskHandle *handle)
    : load_executor_(load_executor), handle_(handle)
  {
  }
  virtual ~FileLoadTaskCallback() = default;
  void callback(int ret_code, ObTableLoadTask *task) override
  {
    handle_->result_.ret_ = ret_code;
    load_executor_->task_finished(handle_);
    load_executor_->free_task(task);
  }
private:
  FileLoadExecutor *load_executor_;
  TaskHandle *handle_;
};

/**
 * LargeFileLoadTaskProcessor
 */

class ObLoadDataDirectImpl::LargeFileLoadTaskProcessor : public ObITableLoadTaskProcessor
{
public:
  LargeFileLoadTaskProcessor(ObTableLoadTask &task, FileLoadExecutor *file_load_executor,
                             int64_t worker_idx, TaskHandle *handle)
    : ObITableLoadTaskProcessor(task),
      file_load_executor_(file_load_executor),
      worker_idx_(worker_idx),
      handle_(handle)
  {
  }
  virtual ~LargeFileLoadTaskProcessor() = default;
  int process() override;
  INHERIT_TO_STRING_KV("task_processor", ObITableLoadTaskProcessor, KPC_(handle));
private:
  FileLoadExecutor *file_load_executor_;
  int64_t worker_idx_;
  TaskHandle *handle_;
};

int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process()
{
  int ret = OB_SUCCESS;
  handle_->result_.start_process_ts_ = ObTimeUtil::current_time();
  int64_t line_count = 0;
  if (OB_FAIL(file_load_executor_->process_task_handle(worker_idx_, handle_, line_count))) {
    LOG_WARN("fail to process task handle", KR(ret));
  }
  return ret;
}

/**
 * LargeFileLoadExecutor
 */

ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor()
  : next_session_id_(1), total_line_count_(0)
{
}

ObLoadDataDirectImpl::LargeFileLoadExecutor::~LargeFileLoadExecutor()
{
}

int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &execute_param,
                                                      LoadExecuteContext &execute_ctx,
                                                      const DataDescIterator &data_desc_iter)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::LargeFileLoadExecutor init twice", KR(ret), KP(this));
  } else if (OB_UNLIKELY(!execute_param.is_valid() || !execute_ctx.is_valid() ||
                         1 != data_desc_iter.count())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(data_desc_iter));
  } else {
    DataDescIterator copy_data_desc_iter;
    DataDesc data_desc;
    if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.data_mem_usage_limit_))) {
      LOG_WARN("fail to init inner", KR(ret));
    }
    // data_desc_
    else if (OB_FAIL(copy_data_desc_iter.copy(data_desc_iter))) {
      LOG_WARN("fail to copy data desc iter", KR(ret));
    } else if (OB_FAIL(copy_data_desc_iter.get_next_data_desc(data_desc))) {
      LOG_WARN("fail to get next data desc", KR(ret));
    }
    // expr_buffer_
    else if (OB_FAIL(expr_buffer_.init())) {
      LOG_WARN("fail to init data buffer", KR(ret));
    }
    // data_reader_
    else if (OB_FAIL(
               data_reader_.init(execute_param_->data_access_param_, *execute_ctx_, data_desc))) {
      LOG_WARN("fail to init data reader", KR(ret));
    } else {
      data_desc_ = data_desc;
      is_inited_ = true;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::LargeFileLoadExecutor::prepare_execute()
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(skip_ignore_rows())) {
    LOG_WARN("fail to skip ignore rows", KR(ret));
  }
  return ret;
}

int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle *&handle)
{
  int ret = OB_SUCCESS;
  int64_t current_line_count = 0;
  expr_buffer_.reuse();
  if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, current_line_count))) {
    if (OB_UNLIKELY(OB_ITER_END != ret)) {
      LOG_WARN("fail to get next buffer", KR(ret));
    }
  } else if (OB_FAIL(fetch_task_handle(handle))) {
    LOG_WARN("fail to fetch task handle", KR(ret));
  } else {
    handle->task_id_ = task_controller_.get_next_task_id();
    handle->session_id_ = get_session_id();
    handle->data_desc_ = data_desc_;
    handle->start_line_no_ = total_line_count_ + 1;
    handle->result_.created_ts_ = ObTimeUtil::current_time();
    handle->data_buffer_.swap(expr_buffer_);
    handle->data_buffer_.is_end_file_ = data_reader_.is_end_file();
    total_line_count_ += current_line_count;
  }
  return ret;
}

int ObLoadDataDirectImpl::LargeFileLoadExecutor::fill_task(TaskHandle *handle,
                                                           ObTableLoadTask *task)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(
        task->set_processor<LargeFileLoadTaskProcessor>(this, handle->session_id_ - 1, handle))) {
    LOG_WARN("fail to set large file load task processor", KR(ret));
  } else if (OB_FAIL(task->set_callback<FileLoadTaskCallback>(this, handle))) {
    LOG_WARN("fail to set file load task callback", KR(ret));
  }
  return ret;
}

int32_t ObLoadDataDirectImpl::LargeFileLoadExecutor::get_session_id()
{
  if (next_session_id_ > execute_param_->parallel_) {
    next_session_id_ = 1;
  }
  return next_session_id_++;
}

int ObLoadDataDirectImpl::LargeFileLoadExecutor::skip_ignore_rows()
{
  int ret = OB_SUCCESS;
  const int64_t ignore_row_num = execute_param_->ignore_row_num_;
  if (ignore_row_num > 0) {
    int64_t line_count = 0;
    while (OB_SUCC(ret) && data_reader_.get_lines_count() < ignore_row_num) {
      if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, line_count,
                                               ignore_row_num - data_reader_.get_lines_count()))) {
        if (OB_UNLIKELY(OB_ITER_END != ret)) {
          LOG_WARN("fail to get next buffer", KR(ret));
        } else {
          ret = OB_SUCCESS;
          break;
        }
      }
    }
    LOG_INFO("LOAD DATA skip ignore rows", KR(ret), K(ignore_row_num),
             K(data_reader_.get_lines_count()));
  }
  return ret;
}

/**
 * MultiFilesLoadTaskProcessor
 */

class ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor : public ObITableLoadTaskProcessor
{
public:
  MultiFilesLoadTaskProcessor(ObTableLoadTask &task, const LoadExecuteParam *execute_param,
                              LoadExecuteContext *execute_ctx, FileLoadExecutor *file_load_executor,
                              int64_t worker_idx, TaskHandle *handle)
    : ObITableLoadTaskProcessor(task),
      execute_param_(execute_param),
      execute_ctx_(execute_ctx),
      file_load_executor_(file_load_executor),
      worker_idx_(worker_idx),
      handle_(handle)
  {
  }
  virtual ~MultiFilesLoadTaskProcessor() = default;
  int process() override;
  INHERIT_TO_STRING_KV("task_processor", ObITableLoadTaskProcessor, KPC_(handle));
private:
  int skip_ignore_rows(int64_t &skip_line_count);
private:
  const LoadExecuteParam *execute_param_;
  LoadExecuteContext *execute_ctx_;
  FileLoadExecutor *file_load_executor_;
  int64_t worker_idx_;
  TaskHandle *handle_;
  DataReader data_reader_;
};

int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
{
  int ret = OB_SUCCESS;
  handle_->result_.start_process_ts_ = ObTimeUtil::current_time();
  int64_t total_line_count = 0;
  int64_t current_line_count = 0;
  if (OB_FAIL(data_reader_.init(execute_param_->data_access_param_, *execute_ctx_,
                                handle_->data_desc_, true))) {
    LOG_WARN("fail to init data reader", KR(ret));
  } else if (0 == handle_->data_desc_.file_idx_ && 0 == handle_->data_desc_.start_) {
    if (OB_FAIL(skip_ignore_rows(total_line_count))) {
      LOG_WARN("fail to skip ignore rows", KR(ret));
    } else if (OB_UNLIKELY(total_line_count < execute_param_->ignore_row_num_)) {
      ret = OB_NOT_SUPPORTED;
      LOG_WARN("direct-load does not support ignore rows exceed the first file", KR(ret),
               K(total_line_count), K(execute_param_->ignore_row_num_));
    } else if (!handle_->data_buffer_.empty()) {
      handle_->start_line_no_ = total_line_count + 1;
      if (OB_FAIL(
            file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) {
        LOG_WARN("fail to process task handle", KR(ret));
      } else {
        total_line_count += current_line_count;
        current_line_count = 0;
      }
    }
  }
  while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->check_status())) {
    if (OB_FAIL(handle_->data_buffer_.squash())) {
      LOG_WARN("fail to squash data buffer", KR(ret));
    } else if (OB_FAIL(data_reader_.get_next_raw_buffer(handle_->data_buffer_))) {
      if (OB_UNLIKELY(OB_ITER_END != ret)) {
        LOG_WARN("fail to get next buffer", KR(ret));
      } else {
        ret = OB_SUCCESS;
        if (OB_UNLIKELY(!handle_->data_buffer_.empty())) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("has incomplate data", KR(ret));
        }
        break;
      }
    } else {
      handle_->data_buffer_.is_end_file_ = data_reader_.is_end_file();
      handle_->start_line_no_ = total_line_count + 1;
      if (OB_FAIL(
            file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) {
        LOG_WARN("fail to process task handle", KR(ret));
      } else if (OB_UNLIKELY(0 == current_line_count)) {
        ret = OB_NOT_SUPPORTED;
        LOG_WARN("direct-load does not support big row", KR(ret), "size",
                 handle_->data_buffer_.get_data_length());
      } else {
        total_line_count += current_line_count;
        current_line_count = 0;
      }
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::skip_ignore_rows(int64_t &skip_line_count)
{
  int ret = OB_SUCCESS;
  const int64_t ignore_row_num = execute_param_->ignore_row_num_;
  skip_line_count = 0;
  if (ignore_row_num > 0) {
    DataBuffer &data_buffer = handle_->data_buffer_;
    data_buffer.reuse();
    while (OB_SUCC(ret) && skip_line_count < ignore_row_num) {
      if (OB_FAIL(data_buffer.squash())) {
        LOG_WARN("fail to squash data buffer", KR(ret));
      } else if (OB_FAIL(data_reader_.get_next_raw_buffer(data_buffer))) {
        if (OB_UNLIKELY(OB_ITER_END != ret)) {
          LOG_WARN("fail to get next buffer", KR(ret));
        } else {
          ret = OB_SUCCESS;
          break;
        }
      } else if (OB_UNLIKELY(data_buffer.empty())) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("unexpected empty data buffer", KR(ret));
      } else {
        int64_t complete_cnt = ignore_row_num - skip_line_count;
        int64_t complete_len = 0;
        if (OB_FAIL(ObLoadDataBase::pre_parse_lines(
              *data_buffer.file_buffer_, data_reader_.get_csv_parser(), data_reader_.is_end_file(),
              complete_len, complete_cnt))) {
          LOG_WARN("fail to fast_lines_parse", KR(ret));
        } else if (OB_UNLIKELY(0 == complete_len)) {
          ret = OB_NOT_SUPPORTED;
          LOG_WARN("direct-load does not support big row", KR(ret), "size",
                   data_buffer.get_data_length());
        } else {
          data_buffer.advance(complete_len);
          skip_line_count += complete_cnt;
        }
      }
    }
    LOG_INFO("LOAD DATA skip ignore rows", KR(ret), K(ignore_row_num), K(skip_line_count));
  }
  return ret;
}

/**
 * MultiFilesLoadExecutor
 */

int ObLoadDataDirectImpl::MultiFilesLoadExecutor::init(const LoadExecuteParam &execute_param,
                                                       LoadExecuteContext &execute_ctx,
                                                       const DataDescIterator &data_desc_iter)
{
  int ret = OB_SUCCESS;
  if (IS_INIT) {
    ret = OB_INIT_TWICE;
    LOG_WARN("ObLoadDataDirectImpl::MultiFilesLoadExecutor init twice", KR(ret), KP(this));
  } else if (OB_UNLIKELY(!execute_param.is_valid() || !execute_ctx.is_valid() ||
                         data_desc_iter.count() <= 1)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(data_desc_iter));
  } else {
    if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.parallel_))) {
      LOG_WARN("fail to init inner", KR(ret));
    } else if (OB_FAIL(data_desc_iter_.copy(data_desc_iter))) {
      LOG_WARN("fail to copy data desc iter", KR(ret));
    } else {
      is_inited_ = true;
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::MultiFilesLoadExecutor::prepare_execute()
{
  int ret = OB_SUCCESS;
  for (int64_t i = 0; i < handle_resource_.count(); ++i) {
    handle_resource_.at(i)->session_id_ = i + 1;
  }
  return ret;
}

int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandle *&handle)
{
  int ret = OB_SUCCESS;
  DataDesc data_desc;
  if (OB_FAIL(data_desc_iter_.get_next_data_desc(data_desc))) {
    if (OB_UNLIKELY(OB_ITER_END != ret)) {
      LOG_WARN("fail to get next data desc", KR(ret));
    }
  } else if (OB_FAIL(fetch_task_handle(handle))) {
    LOG_WARN("fail to fetch task handle", KR(ret));
  } else {
    handle->task_id_ = task_controller_.get_next_task_id();
    handle->data_desc_ = data_desc;
    handle->start_line_no_ = 0;
    handle->result_.created_ts_ = ObTimeUtil::current_time();
  }
  return ret;
}

int ObLoadDataDirectImpl::MultiFilesLoadExecutor::fill_task(TaskHandle *handle,
                                                            ObTableLoadTask *task)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(task->set_processor<MultiFilesLoadTaskProcessor>(execute_param_, execute_ctx_, this,
                                                               handle->session_id_ - 1, handle))) {
    LOG_WARN("fail to set multi files load task processor", KR(ret));
  } else if (OB_FAIL(task->set_callback<FileLoadTaskCallback>(this, handle))) {
    LOG_WARN("fail to set file load task callback", KR(ret));
  }
  return ret;
}

/**
 * ObLoadDataDirectImpl
 */

ObLoadDataDirectImpl::ObLoadDataDirectImpl()
  : ctx_(nullptr), load_stmt_(nullptr)
{
}

ObLoadDataDirectImpl::~ObLoadDataDirectImpl()
{
}

int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
{
  int ret = OB_SUCCESS;
  ctx_ = &ctx;
  load_stmt_ = &load_stmt;
  const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
  const int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();

  if (OB_SUCC(ret)) {
    int64_t query_timeout = 0;
    ObSQLSessionInfo *session = nullptr;
    if (OB_ISNULL(session = ctx.get_my_session())) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("session is null", KR(ret));
    } else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
      LOG_WARN("fail to get query timeout", KR(ret));
    } else {
      query_timeout = MAX(query_timeout, RPC_BATCH_INSERT_TIMEOUT_US);
      THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + query_timeout);
    }
  }

  if (OB_FAIL(init_execute_param())) {
    LOG_WARN("fail to init execute param", KR(ret), K(ctx), K(load_stmt));
  } else if (OB_FAIL(init_execute_context())) {
    LOG_WARN("fail to init execute context", KR(ret), K(ctx), K(load_stmt));
  } else {
    LOG_INFO("LOAD DATA init finish", K_(execute_param), "file_path", load_args.file_name_);
    ObLoadDataStat *job_stat = execute_ctx_.job_stat_;
    OZ(ob_write_string(job_stat->allocator_, load_args.file_name_, job_stat->file_path_));
    job_stat->file_column_ = execute_param_.data_access_param_.file_column_num_;
    job_stat->load_mode_ = static_cast<int64_t>(execute_param_.dup_action_);
  }

  if (OB_SUCC(ret)) {
    FileLoadExecutor *file_load_executor = nullptr;
    DataDescIterator data_desc_iter;
    if (1 == load_args.file_iter_.count() && 0 == execute_param_.ignore_row_num_ &&
        SimpleDataSplitUtils::is_simple_format(execute_param_.data_access_param_.file_format_,
                                               execute_param_.data_access_param_.file_cs_type_)) {
      DataDesc data_desc;
      data_desc.filename_ = load_args.file_name_;
      if (OB_FAIL(SimpleDataSplitUtils::split(execute_param_.data_access_param_, data_desc,
                                              execute_param_.parallel_, data_desc_iter))) {
        LOG_WARN("fail to split data", KR(ret));
      }
    } else {
      if (OB_FAIL(data_desc_iter.copy(load_args.file_iter_))) {
        LOG_WARN("fail to copy file iter", KR(ret));
      }
    }
    if (OB_SUCC(ret)) {
      if (1 == data_desc_iter.count()) {
        // large file load
        if (OB_ISNULL(file_load_executor =
                        OB_NEWx(LargeFileLoadExecutor, execute_ctx_.allocator_))) {
          ret = OB_ALLOCATE_MEMORY_FAILED;
          LOG_WARN("fail to new LargeFileLoadExecutor", KR(ret));
        }
      } else if (data_desc_iter.count() > 1) {
        // multi files load
        if (OB_ISNULL(file_load_executor =
                        OB_NEWx(MultiFilesLoadExecutor, execute_ctx_.allocator_))) {
          ret = OB_ALLOCATE_MEMORY_FAILED;
          LOG_WARN("fail to new MultiFilesLoadExecutor", KR(ret));
        }
      }
    }
    if (OB_SUCC(ret)) {
      if (OB_FAIL(file_load_executor->init(execute_param_, execute_ctx_, data_desc_iter))) {
        LOG_WARN("fail to init file load executor", KR(ret));
      } else if (OB_FAIL(file_load_executor->execute())) {
        LOG_WARN("fail to execute file load", KR(ret));
      }
    }
    if (nullptr != file_load_executor) {
      file_load_executor->~FileLoadExecutor();
      file_load_executor = nullptr;
    }
  }

  if (OB_SUCC(ret)) {
    ObTableLoadResultInfo result_info;
    if (OB_FAIL(direct_loader_.commit(result_info))) {
      LOG_WARN("fail to commit direct loader", KR(ret));
    } else {
      ObPhysicalPlanCtx *phy_plan_ctx = execute_ctx_.exec_ctx_->get_physical_plan_ctx();
      phy_plan_ctx->set_affected_rows(result_info.rows_affected_);
      phy_plan_ctx->set_row_matched_count(result_info.records_);
      phy_plan_ctx->set_row_deleted_count(result_info.deleted_);
      phy_plan_ctx->set_row_duplicated_count(result_info.skipped_);
    }
  }

  direct_loader_.destroy();
  THIS_WORKER.set_timeout_ts(original_timeout_us);

  return ret;
}

int ObLoadDataDirectImpl::init_execute_param()
{
  int ret = OB_SUCCESS;
  const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
  const ObLoadDataHint &hint = load_stmt_->get_hints();
  const ObIArray<ObLoadDataStmt::FieldOrVarStruct> &field_or_var_list =
    load_stmt_->get_field_or_var_list();
  execute_param_.tenant_id_ = load_args.tenant_id_;
  execute_param_.database_id_ = load_args.database_id_;
  execute_param_.table_id_ = load_args.table_id_;
  execute_param_.database_name_ = load_args.database_name_;
  execute_param_.table_name_ = load_args.table_name_;
  execute_param_.combined_name_ = load_args.combined_name_;
  execute_param_.ignore_row_num_ = load_args.ignore_rows_;
  execute_param_.dup_action_ = load_args.dupl_action_;
  // parallel_
  if (OB_SUCC(ret)) {
    ObTenant *tenant = nullptr;
    int64_t hint_parallel = 0;
    if (OB_FAIL(hint.get_value(ObLoadDataHint::PARALLEL_THREADS, hint_parallel))) {
      LOG_WARN("fail to get value of PARALLEL_THREADS", KR(ret), K(hint));
    } else if (OB_FAIL(GCTX.omt_->get_tenant(execute_param_.tenant_id_, tenant))) {
      LOG_WARN("fail to get tenant handle", KR(ret), K(execute_param_.tenant_id_));
    } else {
      hint_parallel = hint_parallel > 0 ? hint_parallel : DEFAULT_PARALLEL_THREAD_COUNT;
      hint_parallel = load_args.file_iter_.count() > 1
                        ? MIN(hint_parallel, load_args.file_iter_.count())
                        : hint_parallel;
      execute_param_.parallel_ = MIN(hint_parallel, (int64_t)tenant->unit_max_cpu());
      execute_param_.data_mem_usage_limit_ =
        MIN(execute_param_.parallel_ * 2, MAX_DATA_MEM_USAGE_LIMIT);
    }
  }
  // batch_row_count_
  if (OB_SUCC(ret)) {
    int64_t hint_batch_size = 0;
    if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) {
      LOG_WARN("fail to get value of BATCH_SIZE", KR(ret), K(hint));
    } else {
      execute_param_.batch_row_count_ =
        hint_batch_size > 0 ? hint_batch_size : DEFAULT_BUFFERRED_ROW_COUNT;
    }
  }
  // need_sort_
  if (OB_SUCC(ret)) {
    int64_t append = 0;
    int64_t enable_direct = 0;
    int64_t hint_need_sort = 0;
    if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
      LOG_WARN("fail to get value of APPEND", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
      LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::NEED_SORT, hint_need_sort))) {
      LOG_WARN("fail to get value of NEED_SORT", KR(ret), K(hint));
    } else if (enable_direct != 0) {
      execute_param_.need_sort_ = hint_need_sort > 0 ? true : false;
    } else {
      execute_param_.need_sort_ = true;
    }
  }
  // sql_mode_
  if (OB_SUCC(ret)) {
    ObSQLSessionInfo *session = nullptr;
    uint64_t sql_mode;
    if (OB_ISNULL(session = ctx_->get_my_session())) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("session is null", KR(ret));
    } else if (OB_FAIL(session->get_sys_variable(SYS_VAR_SQL_MODE, sql_mode))) {
      LOG_WARN("fail to get sys variable", K(ret));
    } else {
      execute_param_.sql_mode_ = sql_mode;
    }
  }
  // online_opt_stat_gather_
  if (OB_SUCC(ret)) {
    int64_t append = 0;
    int64_t gather_optimizer_statistics = 0 ;
    ObSQLSessionInfo *session = nullptr;
    ObObj obj;
    if (OB_ISNULL(session = ctx_->get_my_session())) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("session is null", KR(ret));
    } else if (OB_FAIL(session->get_sys_variable(SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD, obj))) {
      LOG_WARN("fail to get sys variable", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
      LOG_WARN("fail to get value of APPEND", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::GATHER_OPTIMIZER_STATISTICS, gather_optimizer_statistics))) {
      LOG_WARN("fail to get value of APPEND", K(ret));
    } else if (((append != 0) || (gather_optimizer_statistics != 0)) && obj.get_bool()) {
      execute_param_.online_opt_stat_gather_  = true;
    } else {
      execute_param_.online_opt_stat_gather_ = false;
    }
  }
  // max_error_rows_
  if (OB_SUCC(ret)) {
    int64_t append = 0;
    int64_t enable_direct = 0;
    int64_t hint_error_rows = 0;
    if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
      LOG_WARN("fail to get value of APPEND", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
      LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
    } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) {
      LOG_WARN("fail to get value of ERROR_ROWS", KR(ret), K(hint));
    } else if (enable_direct != 0) {
      execute_param_.max_error_rows_ = hint_error_rows;
    } else {
      execute_param_.max_error_rows_ = 0;
    }
  }
  // data_access_param_
  if (OB_SUCC(ret)) {
    DataAccessParam &data_access_param = execute_param_.data_access_param_;
    data_access_param.file_location_ = load_args.load_file_storage_;
    data_access_param.file_column_num_ = field_or_var_list.count();
    data_access_param.file_format_ = load_stmt_->get_data_struct_in_file();
    data_access_param.file_cs_type_ = load_args.file_cs_type_;
    data_access_param.access_info_ = load_args.access_info_;
  }
  // store_column_idxs_
  if (OB_SUCC(ret)) {
    if (OB_FAIL(init_store_column_idxs(execute_param_.store_column_idxs_))) {
      LOG_WARN("fail to init store column idxs", KR(ret));
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::init_store_column_idxs(ObIArray<int64_t> &store_column_idxs)
{
  int ret = OB_SUCCESS;
  const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
  const ObIArray<ObLoadDataStmt::FieldOrVarStruct> &field_or_var_list =
    load_stmt_->get_field_or_var_list();
  const uint64_t tenant_id = load_args.tenant_id_;
  const uint64_t table_id = load_args.table_id_;
  ObSchemaGetterGuard schema_guard;
  const ObTableSchema *table_schema = nullptr;
  ObSEArray<ObColDesc, 64> column_descs;
  if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id,
                                                                                  schema_guard))) {
    LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
  } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
    LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
  } else if (OB_ISNULL(table_schema)) {
    ret = OB_TABLE_NOT_EXIST;
    LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
  } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
    STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema));
  } else {
    bool found_column = true;
    for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) {
      const ObColDesc &col_desc = column_descs.at(i);
      const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_);
      if (OB_ISNULL(col_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
      } else {
        found_column = col_schema->is_hidden();
      }
      // 在源数据的列数组中找到对应的列
      for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < field_or_var_list.count();
           ++j) {
        const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(j);
        if (col_desc.col_id_ == field_or_var_struct.column_id_) {
          found_column = true;
          if (OB_FAIL(store_column_idxs.push_back(j))) {
            LOG_WARN("fail to push back column desc", KR(ret), K(store_column_idxs), K(i),
                     K(col_desc), K(j), K(field_or_var_struct));
          }
        }
      }
    }
    if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) {
      ret = OB_NOT_SUPPORTED;
      LOG_WARN("not supported incomplete column data", KR(ret), K(store_column_idxs),
               K(column_descs), K(field_or_var_list));
    }
  }
  return ret;
}

int ObLoadDataDirectImpl::init_execute_context()
{
  int ret = OB_SUCCESS;
  execute_ctx_.exec_ctx_ = ctx_;
  execute_ctx_.allocator_ = &ctx_->get_allocator();
  ObTableLoadParam load_param;
  load_param.tenant_id_ = execute_param_.tenant_id_;
  load_param.table_id_ = execute_param_.table_id_;
  load_param.session_count_ = execute_param_.parallel_;
  load_param.batch_size_ = execute_param_.batch_row_count_;
  load_param.max_error_row_count_ = execute_param_.max_error_rows_;
  load_param.column_count_ = execute_param_.store_column_idxs_.count();
  load_param.need_sort_ = execute_param_.need_sort_;
  load_param.data_type_ = ObTableLoadDataType::OBJ_ARRAY;
  load_param.dup_action_ = execute_param_.dup_action_;
  load_param.sql_mode_ = execute_param_.sql_mode_;
  load_param.px_mode_ = false;
  load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
  if (OB_FAIL(direct_loader_.init(load_param,
      execute_param_.store_column_idxs_, &execute_ctx_))) {
    LOG_WARN("fail to init direct loader", KR(ret));
  } else if (OB_FAIL(init_logger())) {
    LOG_WARN("fail to init logger", KR(ret));
  }
  if (OB_SUCC(ret)) {
    execute_ctx_.direct_loader_ = &direct_loader_;
    execute_ctx_.job_stat_ = direct_loader_.get_job_stat();
    execute_ctx_.logger_ = &logger_;
  }
  return ret;
}

int ObLoadDataDirectImpl::init_logger()
{
  int ret = OB_SUCCESS;
  ObString load_info;
  char *buf = nullptr;
  const int64_t buf_len = MAX_BUFFER_SIZE;
  int64_t pos = 0;
  ObSQLSessionInfo *session = nullptr;
  if (OB_ISNULL(session = ctx_->get_my_session())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("session is null", KR(ret));
  } else if (OB_ISNULL(buf = static_cast<char *>(ctx_->get_allocator().alloc(buf_len)))) {
    ret = OB_ALLOCATE_MEMORY_FAILED;
    LOG_WARN("fail to allocate memory", KR(ret), K(buf_len));
  } else {
    const ObString &cur_query_str = session->get_current_query_string();
    const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
    int64_t current_time = ObTimeUtil::current_time();
    OZ(databuff_printf(buf, buf_len, pos,
                       "Tenant name:\t%.*s\n"
                       "File name:\t%.*s\n"
                       "Into table:\t%.*s\n"
                       "Parallel:\t%ld\n"
                       "Batch size:\t%ld\n"
                       "SQL trace:\t%s\n",
                       session->get_tenant_name().length(), session->get_tenant_name().ptr(),
                       load_args.file_name_.length(), load_args.file_name_.ptr(),
                       load_args.combined_name_.length(), load_args.combined_name_.ptr(),
                       execute_param_.parallel_, execute_param_.batch_row_count_,
                       ObCurTraceId::get_trace_id_str()));
    OZ(databuff_printf(buf, buf_len, pos, "Start time:\t"));
    OZ(ObTimeConverter::datetime_to_str(current_time, TZ_INFO(session), ObString(),
                                        MAX_SCALE_FOR_TEMPORAL, buf, buf_len, pos, true));
    OZ(databuff_printf(buf, buf_len, pos, "\n"));
    OZ(databuff_printf(buf, buf_len, pos, "Load query: \n%.*s\n", cur_query_str.length(),
                       cur_query_str.ptr()));
    OX(load_info.assign_ptr(buf, pos));
  }
  if (OB_SUCC(ret)) {
    if (OB_FAIL(logger_.init(load_info))) {
      LOG_WARN("fail to init logger", KR(ret));
    }
  }
  return ret;
}

} // namespace sql
} // namespace oceanbase
